# coding:utf-8 # Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved # # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENSE.TXT import json import os import sys import time from typing import Optional from clcommon.utils import run_command, ExternalProgramFailed from clcommon.clfunc import memory_to_page, page_to_memory from clcommon.clexception import FormattedException VALUES_STR = 'Available values for option' _LIMITS_LOWER_BOUNDS_CONFIG_FILE = '/etc/sysconfig/limits_lower_bounds' class LowerBoundsConfigException(FormattedException): pass def _read_lower_bounds_config() -> dict: """ Read all lower bounds from config file Returns dict with lower bound names as keys and values as strings :raises LowerBoundsConfigException: If file exists but cannot be read """ # If file doesn't exist return empty dict if not os.path.exists(_LIMITS_LOWER_BOUNDS_CONFIG_FILE): return {} lower_bounds = {} try: with open(_LIMITS_LOWER_BOUNDS_CONFIG_FILE, 'r', encoding='utf-8') as f: for line in f: line = line.strip() if line and '=' in line and not line.startswith('#'): key, value = line.split('=', 1) lower_bounds[key.strip()] = value.strip() except (OSError, IOError) as e: raise LowerBoundsConfigException({ 'message': "Failed to read limits' lower bounds configuration from %(file)s. " "The file may be corrupted. Error: %(error)s", 'context': { 'file': _LIMITS_LOWER_BOUNDS_CONFIG_FILE, 'error': str(e) } }) from e return lower_bounds def _write_lower_bounds_config(lower_bounds: dict) -> bool: """ Write lower bounds to config file """ try: os.makedirs(os.path.dirname(_LIMITS_LOWER_BOUNDS_CONFIG_FILE), exist_ok=True) with open(_LIMITS_LOWER_BOUNDS_CONFIG_FILE, 'w', encoding='utf-8') as f: f.write('# CloudLinux lower bounds configuration\n') f.write('# Format: RESOURCE_TYPE=value\n\n') for key, value in sorted(lower_bounds.items()): f.write(f'{key}={value}\n') return True except (OSError, IOError): return False def read_lower_bound(resource_type: str) -> Optional[int]: """ Read lower bound for any resource type from config file Returns lower bound in pages or None if not set :param resource_type: Type of resource (e.g., 'PMEM', 'VMEM') :raises LowerBoundsConfigException: If config file is corrupted """ lower_bounds = _read_lower_bounds_config() limit_str = lower_bounds.get(resource_type.upper()) if limit_str: return memory_to_page(limit_str) return None def write_lower_bound(resource_type: str, bound_value: str) -> bool: """ Write lower bound for any resource type to config file :param resource_type: Type of resource (e.g., 'PMEM', 'VMEM') :param bound_value: Lower bound value as string (e.g., '512M', '1G') :raises LowerBoundsConfigException: If config file is corrupted """ lower_bounds = _read_lower_bounds_config() lower_bounds[resource_type.upper()] = bound_value return _write_lower_bounds_config(lower_bounds) def check_pmem_lower_bound(pmem_pages: int) -> Optional[dict]: """ Check if PMEM value is below configured lower bound and return error :param pmem_pages: PMEM value in pages :return: Error dict with 'message' and 'context' or None """ try: lower_bound_pages = read_lower_bound('PMEM') except LowerBoundsConfigException as e: return {'message': e.message, 'context': e.context} if lower_bound_pages is not None and pmem_pages != 0 and pmem_pages < lower_bound_pages: lower_bound = page_to_memory(lower_bound_pages) return { 'message': ( 'PMEM limit is below configured lower bound of %(bound)s. ' 'This may cause application instability and performance issues.' ), 'context': {'bound': lower_bound} } return None def check_pmem_lower_bound_from_string(pmem_value: str) -> Optional[dict]: """ Check if PMEM value (as string) is below configured lower bound and return error :param pmem_value: PMEM value as string (e.g., "5m", "512M", etc.) :return: Error dict with 'message' and 'context' or None """ pmem_pages = memory_to_page(pmem_value) if pmem_pages is not None: return check_pmem_lower_bound(pmem_pages) return None def replace_params(data): """ Replacing params in data for show error message :param data: error's data for show message :return: """ if data.startswith("--"): param, text = data.split(" ", 1) return {"result": "%(param)s " + text, "context": {"param": param}} if data.startswith(VALUES_STR): text, param = data.split(":", 1) return {"result": text + ": %(available_options)s", "context": {"available_options": param.strip()}} return {"result": data} def _is_string_number(s_val): """ Checks is string contains a number (integer or float) :param s_val: String to check :return: True - string is number, False - not number """ try: float(s_val) return True except ValueError: return False def convert_mem_value_to_bytes(value): """ Convert value in Gbytes,Mbytes to bytes :param value: value of mem limit :return: value in bytes """ value = str(value).lower() if value.endswith('k'): power = 1 elif value.endswith('m'): power = 2 elif value.endswith('g'): power = 3 elif _is_string_number(value): power = 1 value = f'{value}k' else: raise ValueError('Wrong memory value') return int(1024 ** power * float(value[:-1])) def _convert_memory_value_to_adaptive_format(value, convert=True): """ Convert memory value to adaptive value in GB, TB, etc :param value: memory value in MB or KB :param convert: if True - convert value, False - not convert :return: adaptive value in GB, TB, etc """ if not convert: return value value = str(value).lower() units = ['K', 'M', 'G', 'T', 'P'] if value.endswith('m'): del units[0] value = str(value).lower().replace('m', '').replace('k', '') # remove unit symbol if value.startswith('*'): result = '*' value = value.replace('*', '') else: result = '' value = float(value) for unit in units: if value // 1024 > 0: value /= 1024 elif value == 0: result = f'{result}0K' break else: result = f'{result}{value:.2f}{unit}' break return result def print_dictionary(data_dict, is_json=False): """ Print specified dictionary :param data_dict: data dictionary to print :param is_json: True - print in JSON, False - in text :return: None """ # Print as JSON # print json.dumps(data_dict, indent=2, sort_keys=True) if is_json: # Print as JSON print(json.dumps(data_dict, sort_keys=True)) else: # Print as text print(data_dict) def print_error_and_exit(message, is_json=False): """ Prints to stdout :param: is_json - True if print error in json format, False - text """ data = {"timestamp": time.time(), "result": str(message)} print_dictionary(data, is_json) sys.exit(1) def is_quota_supported(cl_quota_path, repquota_path): """ Detect quota is supported :return: True/False - quotas supported/not supported """ # Check if all needed utilities present if not os.path.isfile(cl_quota_path) or not os.path.isfile(repquota_path): return False return True def is_quota_active(cl_quota_path, repquota_path): """ Detect quota is activated :return: True/False - quotas activated/not activated """ # If quotas not supported they are not activated if not is_quota_supported(cl_quota_path, repquota_path): return False # Check is quota activated cmd = [repquota_path, '-nva'] try: stdout = run_command(cmd) except ExternalProgramFailed: return False # quotas not supported if repqouta returns nothing if not stdout: return False return True